import holoviews as hv
hv.extension('bokeh')
hv.opts.defaults(hv.opts.Curve(width=500), 
                 hv.opts.Image(width=500, colorbar=True, cmap='Viridis'))
import numpy as np
import scipy.signal
import scipy.fft
from IPython.display import Audio

7. Sistemas para el procesamiento de señales

7.1. Introducción

7.1.1. Definición de sistema

Hasta ahora hemos realizado análisis de señales, es decir el estudio de las señales y sus propiedades en el dominio del tiempo y frecuencia

En esta unidad nos enfocaremos en el procesamiento de señales, es decir el diseño de sistemas que procesan una señal de entrada y producen una señal de salida

../../_images/system.png

Usaremos

  • \(x[n]\) para denotar la señal (discreta) de entrada y \(X[k]\) su espectro

  • \(y[n]\) para denotar la señal (discreta) de salida e \(Y[k]\) su espectro

7.1.2. Ejemplos de sistemas

Utilizando sistemas podemos modificar una señal para mejorar su calidad o remover efectos indeseados

  • Un sistema para reducir el ruido de una señal de electroencefalograma (EEG)

../../_images/system-denoise-eeg.png
  • Un sistema para mejorar una imagen fuera de foco (sharpening)

../../_images/system-sharpen.jpg
  • Un sistema para eliminar el eco de un audio

../../_images/system-echo.png

7.2. Propiedades y ejemplos de sistemas

7.2.1. Sistemas sin memoria

Diremos que un sistema \(\Phi\) es un sistema sin memoria si

\[ y[n] = \Phi(x[n]), \]

es decir que la salida del sistema en un instante \(n\) dado depende solo de la entrada en ese mismo instante

Veamos algunos ejemplos

Sistema atenuador/amplificador ideal

\[ y[n] = A x[n], \]

donde \(A>0\) se llama ganancia

Este sistema puede atenuar la entrada si \(0<A<1\) y amplificar si \(A>1\)

Sistema saturador (clamp)

\[\begin{split} y[n] = \begin{cases} B &x[n] > B \\x[n] & x[n] \in [-B, B]\\ -B & x[n] < -B\end{cases} \end{split}\]

Este sistema limita los valores de la señal de entrada en un rango fijo

Sistema rectificador

\[ y[n] = | x[n] | \]

Este sistema eliminar la parte negativa de la señal de entrada

7.2.2. Sistema Lineal

Diremos que un sistema \(\Phi\) es lineal si cumple con las siguientes propiedades

Homogeneidad

Un cambio en la amplitud de la entrada produce un cambio equivalente en la salida

\[ \Phi(cx[n]) = c \Phi(x[n]) = c y[n] \]

Aditividad

Señales que se suman en la entrada producen señales que se suman en la salida

\[ \Phi(x_1[n] + x_2[n]) = \Phi(x_1[n]) + \Phi(x_2[n]) = y_1[n] + y_2[n] \]

Es decir que las señales pasan por el sistema sin interactuar entre ellas

Otras propiedades de los sistemas lineales

Producto de las propiedades anteriores se tiene que una cascada de sistemas lineales forma un sistema lineal equivalente y además la cascada de sistemas es conmutativa, es decir que el orden de los sistemas en la cascada no altera el resultado final

La siguiente figura ejemplifica esta propiedad

../../_images/system-conmu.png

Otra propiedad interesante de los sistemas lineales es que cumplen el Principio de superposición:

  1. Si descomponemos una señal en \(M\) componentes: \(x[n] = x_1[n] + x_2[n] + \ldots + x_M[n]\)

  2. Y aplicamos un sistema lineal a cada componente: \(y_j[n] = \Phi(x_j[n])\)

  3. Podemos recuperar la salida total usando aditividad: \(y_1[n] + y_2[n] + \ldots + y_M[n] = y[n]\)

La siguiente figura ejemplifica esta propiedad

../../_images/system-superpos.png

7.2.3. Sistemas con memoria

Un sistema \(\Phi\) es un sistema con memoria si su salida actual depende sólo de la entrada actual, las entradas anteriores o las salidas anteriores

\[\begin{split} \begin{align} y[n] = \Phi(x[n], & x[n-1], x[n-2], \ldots, x[0], \\ \nonumber & y[n-1], y[n-2], \ldots, y[0]) \nonumber \end{align} \end{split}\]

esto también se conoce como sistema causal

Un sistema con memoria no-causal usa entradas futuras (es decir \(x[n+1]\), \(x[n+2]\), …) y por ende solo se puede implementar de forma offline, es decir una vez que sea ha observado toda la señal

A continuación veremos algunos ejemplos de sistemas con memoria causales

Sistema con un retardo (delay)

Definido como

\[ y[n] = x[n-m], \]

donde

  • la salida depende solo de “una” entrada anterior

  • el valor de m define que tan “antigua” es la entrada pasada

El delay o retarno no afecta la amplitud de los componentes frecuenciales de la señal pero si su fase, como muestra la siguiente figura

n = np.arange(0, 200, step=1)
x = lambda m: np.sin(2.0*np.pi*0.05*(n-m)) 
f = scipy.fft.rfftfreq(d=1, n=len(n))

p = []
for m in [0, 4, 8]:
    x_delayed = x(m)
    p.append(hv.Curve((n, x_delayed), 'Tiempo [s]', 'Señal'))
    X = scipy.fft.rfft(x_delayed)
    Xm = np.absolute(X) 
    p.append(hv.Curve((f, Xm), 'Frecuencia [Hz]', 'Espectro de amplitud'))
    Xp = np.angle(X)
    p.append(hv.Curve((f, Xm*Xp/np.amax(Xm)), 'Frecuencia [Hz]', 'Espectro de fase'))
hv.Layout(p).cols(3).opts(hv.opts.Curve(width=250, height=200))

Sistema reverberador o eco

Definido como

\[ y[n] = x[n] + A x[n-m], \]

donde

  • la salida depende de una entrada “pasada” y la entrada actual

  • la ganancia controla si el “eco” se atenua o amplifica

Al contrario del sistema anterior, el eco si puede modificar el espectro de amplitud.

Notemos el efecto de interferencia constructiva y destructiva al modificar el retardo, como muestra la siguiente animación

n = np.arange(0, 200, step=1)
x = lambda m, A=1: A*np.sin(2.0*np.pi*0.05*(n-m)) 
f = scipy.fft.rfftfreq(d=1, n=len(n))

buffer = {}
for m in range(0, 40, 2):
    xm = x(m)
    buffer[m] = (xm, np.abs(scipy.fft.rfft(x(0) + xm)))    
hMap1 = hv.HoloMap(kdims='m')
hMap2 = hv.HoloMap(kdims='m')
hMap3 = hv.HoloMap(kdims='m')

for m, (xm, X) in buffer.items():     
    hMap1[m] = hv.Curve((n, xm), 'Tiempo', 'x', label='Ax[n-m]')    
    hMap2[m] = hv.Curve((n, x(0) + xm), 'Tiempo', 'y')
    hMap3[m] = hv.Curve((f, X), 'Frecuencia', 'Espectro')

p_clean = hv.Curve((n, x(0)), 'Tiempo', label='x[n]').opts(width=500, height=200)
plot = (p_clean * hMap1 + hMap2 + hMap3).cols(1).opts(hv.opts.Curve(height=200), 
                                               hv.opts.Overlay(legend_position='top_right'))

hv.output(plot, holomap='gif', fps=5)

El siguiente video muestra un experimento que ejemplifica la interferencia destructiva en una onda mecánica: https://www.youtube.com/watch?v=IU8xeJlJ0mk

Sistemas con múltiples ecos

Pueden combinarse más retardos para hacer un sistema reverberante más complejo

Por ejemplo el siguiente sistema

\[ y[n] = x[n] + A_1 x[n-m_1] + A_2 x[n-m_2] + A_3 x[n-m_3] + \ldots, \]

que lo implementamos como

Fs = 44100; 
n = np.arange(0, 4, step=1.0/Fs) 
x = lambda m: np.sin(2.0*np.pi*880*(n-m))*np.exp(-(n-m)**2/0.5**2)*np.heaviside(n-m, 0)
y = x(0) + 0.5*x(1.) + 0.25*x(2.) + 0.125*x(3.)

Da como resultado gráfico lo siguiente:

hv.Curve((n, y), 'Tiempo [s]', 'Señal con eco')

y el resultado sonoro es:

Audio(y, rate=Fs, normalize=False)

7.3. Sistema de respuesta finita al impulso (FIR)

Generalizando el ejemplo de sistema lineal reverberante a \(L\) retardos llegamos a

\[\begin{split} \begin{align} y[n] &= h[0] x[n] + h[1] x[n-1] + h[2] x[n-2] + \ldots + h[L] x[n-L] \nonumber \\ &= \sum_{j=0}^{L} h[j] x[n-j] \nonumber \\ &= (h* x)[n] \nonumber \end{align} \end{split}\]

que se puede modelar como una convolución discreta entre \(h\) y \(x\)

Este sistema se conoce como

  • Sistema FIR (finite impulse response)

  • Sistema MA (moving average)

  • Sistema todo-zeros

y es de orden L (posee L+1 coeficientes)

Intepretación como media movil (MA)

El sistema FIR es equivalente a una media movil ponderada que se aplica sobre la entrada, donde los coeficientes del sistema son los ponderadores

Por ejemplo sea un sistema de 3 coeficientes \(h[0]=a\), \(h[1]=b\) y \(h[2]=c\)

\[\begin{split} \begin{align} y[n] = (h*x)[n] &= \sum_{j=0}^{2} h[j] x[n-j] \nonumber \\ &= a x[n] + b x[n-1] + c x[n-2] \nonumber \end{align} \end{split}\]

donde cada salida se calcula a partir de

\[ \overbrace{x[0], x[1], x[2]}^{y[2]} , x[3], x[4], \ldots \]
\[ x[0], \overbrace{x[1], x[2] , x[3]}^{y[3]}, x[4], \ldots \]
\[ x[0], x[1], \overbrace{x[2] , x[3], x[4]}^{y[4]}, \ldots \]

Un detalle es que para obtener el valor de \(y[0]\) e \(y[1]\) se deben establecer “condiciones de borde”, como por ejemplo \(x[-2] = x[-1]= 0\)

A continuación veremos algunos ejemplos de aplicaciones usando filtros FIR sencillos

7.3.1. Eliminando ruido blanco aditivo

Si tenemos una señal contaminada con ruido blanco aditivo como la que se muestra a continuación

np.random.seed(0)
n = np.arange(0, 100, step=1)
C = 5*np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/10**2)
# Señal limpia (lo que no conocemos)
x_clean = np.random.multivariate_normal(np.zeros_like(n), C) 
# Datos: Señal + ruido (lo que medimos)
x = x_clean + 2*np.random.randn(len(n))

hv.Scatter((n, x), 'Tiempo', 'Datos (señal + ruido)').opts(width=500, height=200)

Podemos usar un sistema FIR promediador para “suavizar la contaminación”

Sea un sistema FIR con \(L\) coeficientes idénticos e iguales a \(1/L\)

L = 10 
h = np.ones(shape=(L,))/L
y = scipy.signal.convolve(x, h, mode='same', method='auto')

La siguiente gráfica interactiva muestra el resultado de convolucionar este sistema con los datos contaminados

hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')

for m in range(0, 100-L, 5): 
    c = np.zeros_like(n, dtype=np.float64); 
    c[m:m+L] = h
    hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')    
    hMap2[m] = hv.Curve((n[:m], y[:m]), 'Tiempo', label='Señal filtrada')
    
p_clean = hv.Curve((n, x_clean), 'Tiempo', 'Señal', label='Señal limpia').opts(color='k', height=200)
(hMap1 + hMap2 * p_clean).cols(1).opts(hv.opts.Curve(height=200), 
                                       hv.opts.Overlay(legend_position='top_left'))

Nota

Este filtro promedia los datos vecinos resultando una versión suavizada de los mismos. Esta versión suavizada se aproxima a la “señal limpia” que está escondida en el ruido.

En general, mientras más “largo” sea el filtro mayor será el efecto de suavizado

7.3.2. Encontrando cambios en una señal

Sea la siguiente señal escalonada

n = np.arange(0, 100, step=1)
x = np.zeros_like(n, dtype=np.float64)
x[20:] += 1.
x[40:] += 1.
x[80:] -= 1.

hv.Curve((n, x), 'Tiempo', 'Datos').opts(width=500, height=200)

Si nos interesa encontrar cambios en la señal podemos usar un sistema diferenciador

h = np.array([0.5, -0.5])
y = scipy.signal.convolve(x, h, mode='same', method='auto')

La siguiente gráfica interactiva muestra el resultado de convolucionar este sistema con la señal escalonada

hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')

for m in range(0, 100-len(h), 5): 
    c = np.zeros_like(n, dtype=np.float64); 
    c[m:m+len(h)] = h
    hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')    
    hMap2[m] = hv.Curve((n[:m], y[:m]), 'Tiempo', 'Convolución')    

(hMap1 + hMap2).cols(1).opts(hv.opts.Curve(height=200))

Nota

Los pulsos en la convolución están asociados a un cambio (ascenso o descenso) en la señal original

En un caso más general, este filtro nos da información de la velocidad con que cambia la señal

7.3.3. Remover tendencias

En el siguiente ejemplo tenemos una señal sinusoidal “montada” sobre otra señal que cambia más lentamente.

np.random.seed(0); 
n = np.arange(0, 100, step=1)
C = np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/30**2)
x = np.sin(2.0*np.pi*0.1*n) + 5*np.random.multivariate_normal(np.zeros_like(n), C)

hv.Curve((n, x), 'Tiempo', 'Señal').opts(height=200)

En general nos referimos a la señal lenta como la “tendencia”

A continuación veremos un filtro para separar la parte rápida (sinusoide) de la parte lenta (tendencia) de este señal de ejemplo. En próximas lecciones veremos como diseñar este tipo de filtros

L = 5
h = -np.ones(shape=(L,))/L 
h[L//2] += 1

y = scipy.signal.convolve(x, h, mode='same', method='auto')
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')

for m in range(0, 100-len(h), 5): 
    c = np.zeros_like(n, dtype=np.float64); 
    c[m:m+len(h)] = h
    hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')    
    hMap2[m] = hv.Curve((n[0:m], y[0:m]), 'Tiempo', 'Convolución').opts(ylim=(-0.5, 0.5))
    
(hMap1 + hMap2).cols(1).opts(hv.opts.Curve(height=200))

7.4. Convolución con scipy

Podemos convolucionar una señal en Python usando

scipy.signal.convolve(in1, # Señal de entrada
                      in2, # Coeficientes del sistema
                      mode='full', 
                      method='auto'  
                     )

donde el argumento method puede ser

  • direct: Realiza la convolución en el dominio del tiempo

  • fft: Realiza la convolución multiplicando los espectros

  • auto: Se decide automaticamente en base al largo de las señales

y el argumento mode indica donde se hace la convolución

Para ejemplificar la influencia de este argumento consideremos una señal \(x=[a,b,c]\) y un sistema \(h=[d, e]\)

  • Si uso mode=valid el resultado será \(y=[ad+be, bd+ce]\)

  • Si uso mode=same el resultado será \(y=[ae, ad+be, bd+ce]\), es decir se agregan ceros al principio de \(x\) tal que \(y\) sea del mismo largo que \(x\)

  • Si uso mode=full el resultado será \(y=[ae, ad+be, bd+ce, cd]\), es decir se agregan ceros al principio y al final de \(x\)

7.4.1. Eliminando ruido versión 2.0

Considerando la misma señal contaminada con ruido blanco que vimos anteriormente utilizaremos scipy.signal.convolve para convolucionar con un filtro que suavice el ruido.

Probaremos dos sistemas FIR

  • coeficientes idénticos e iguales a \(1/L\), es decir una ventana rectangular

  • coeficientes que decaen suavemente a cero, como por ejemplo una ventana de Hamming

con distintos valores de \(L\) (largo del filtro)

En este caso los valores los obtenemos usando la función de scipy


scipy.signal.get_window(window, # String con el nombre de la ventana
                        Nx, # Entero con el largo de la ventana 
                        ...)

La ventana rectangular se llama boxcar mientras que la de Hamming se llama hamming. En la documentación de la función se puede revisar otras ventanas disponibles

# La señal de ejemplo de la sección 7.3.1
np.random.seed(0)
n = np.arange(0, 100, step=1)
C = 5*np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/10**2)
x_clean = np.random.multivariate_normal(np.zeros_like(n), C) 
x = x_clean + 2*np.random.randn(len(n))

# La filtramos con distintas funciones y L para luego visualizar el resultado
filters = {}
results = {}
for L in [5, 10, 20, 30, 40]:
    for filter_name in ['boxcar', 'hamming']:    
        h = scipy.signal.get_window(filter_name, L) 
        h = h/np.sum(h)
        if not L in filters:
            filters[L] = {}
            results[L] = {}
        filters[L][filter_name] = h
        results[L][filter_name] = scipy.signal.convolve(x, h, mode='same', method='auto')
hMap1 = hv.HoloMap(kdims='L')
hMap2 = hv.HoloMap(kdims='L')

for L, curves in filters.items():
    p = []
    for filter_name, h in curves.items():
        p.append(hv.Curve((range(L), h), 'Largo del filtro (L)', 'Filtro', label=filter_name))
    hMap1[L] = hv.Overlay(p)

for L, curves in results.items():
    p = []
    for filter_name, y in curves.items():
        p.append(hv.Curve((n, y), 'n', 'Señal', label=filter_name).opts(line_width=2, alpha=0.75))
    hMap2[L] = hv.Overlay(p)
    
p_clean = hv.Curve((n, x_clean), 'Tiempo', 'Señal', label='Señal limpia').opts(color='k')
p_data = hv.Scatter((n, x), 'Tiempo', 'Datos (señal + ruido)').opts(width=500, height=200)
(p_data + hMap1 + hMap2 * p_clean).cols(1).opts(hv.opts.Curve(height=250), 
                                       hv.opts.Overlay(legend_position='top_right'))

Nota

Mientras más larga es la ventana (L) mayor es el suavizado. Adicionalmente la ventana de Hamming produce un filtrado más suave que la rectangular

En la lección siguiente veremos como diseñar un filtro, es decir como calcular los valores de h para resolver una tarea en particular